package com.jhf.youke.hbase.domain.service;

import cn.hutool.json.JSONUtil;
import com.jhf.youke.hbase.domain.model.dto.Column;
import com.jhf.youke.hbase.domain.model.dto.ColumnFamily;
import com.jhf.youke.hbase.domain.model.dto.HbaseDto;
import lombok.extern.log4j.Log4j2;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

/**
* @Description:
* @Param:
* @return:
* @Author: RHJ
* @Date: 2022/11/15
*/
@Component
@Log4j2
public class HbaseService {

    public static Connection connection = HbaseConnection.getConnection() ;


    public  void createNameSpace(String ns) throws Exception{
        Admin admin = connection.getAdmin();
        NamespaceDescriptor descriptor = NamespaceDescriptor.create(ns).build();
        try {
            admin.createNamespace(descriptor);
        }catch (Exception e){
            log.error(e.getMessage());
        }
        admin.close();
    }

    public  Boolean isTableExists(String ns, String name) throws Exception{
        Admin admin = connection.getAdmin();
        Boolean res = admin.tableExists(TableName.valueOf(ns,name));
        admin.close();
        return res;
    }

    /**
    * @Description:   创建表
    * @Param: [ns, name, columnFamilies]
    * @return: void
    * @Author: RHJ
    * @Date: 2022/11/15
    */
    public  void createTable(String ns, String name, String... columnFamilies) throws Exception{
        assert columnFamilies.length !=0 : new RuntimeException("至少需要一个列族");

        Admin admin = connection.getAdmin();
        TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(ns, name));
        for(String cf : columnFamilies){
            ColumnFamilyDescriptorBuilder columnFamilyBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));
            columnFamilyBuilder.setMaxVersions(3);
            builder.setColumnFamily(columnFamilyBuilder.build());
        }
        admin.createTable(builder.build());
        admin.close();

    }

    public  void dropTable(String ns, String name) throws Exception{
        Admin admin = connection.getAdmin();
        admin.deleteTable(TableName.valueOf(ns,name));
    }

    public void putCells(HbaseDto dto) throws Exception{
        if(isTableExists(dto.getNameSpace(), dto.getTableName())){
            Table table = connection.getTable(TableName.valueOf(dto.getNameSpace(), dto.getTableName()));
            List<Put> list = new ArrayList<>();
            for(ColumnFamily columnFamily: dto.getColumnFamilyList()){
                for(Column column: columnFamily.getColumns()){
                    Put put = new Put(Bytes.toBytes(dto.getId()));
                    put.addColumn(Bytes.toBytes(columnFamily.getFamilyName()),Bytes.toBytes(column.getName()),Bytes.toBytes(column.getValue()));
                    list.add(put);
                }
            }
            table.put(list);
            table.close();
        }
    }

    public  void putCell(String ns, String tableName,String rowKey, String columnFamily, String column, String value)throws Exception{
        if(isTableExists(ns, tableName)){
            Table table = connection.getTable(TableName.valueOf(ns,tableName));
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value));
            table.put(put);
            table.close();
        }

    }

    public  String getCells(String ns, String tableName,String rowKey, String columnFamily, String column)throws Exception{
        if(isTableExists(ns, tableName)){
            Table table = connection.getTable(TableName.valueOf(ns,tableName));
            Get get = new Get(Bytes.toBytes(rowKey));
            get.readAllVersions();
            Result result = table.get(get);
            Cell[] cells = result.rawCells();
            for(Cell cell : cells){
                String family = new String(CellUtil.cloneFamily(cell));
                String val = new String(CellUtil.cloneValue(cell));
                log.info("family {}, val {}", family, val);
            }
            table.close();
            return JSONUtil.toJsonStr(cells);
        }else {
            return "";
        }

    }

    /**
    * @Description:  扫描全表
    * @Param: [ns, tableName, startRow 开始行, stopRow 结束行]
    * @return: void
    * @Author: RHJ
    * @Date: 2022/12/13
    */
    public  void scanRows(String ns, String tableName,String startRow, String stopRow)throws Exception{
        if(isTableExists(ns, tableName)){
            Table table = connection.getTable(TableName.valueOf(ns,tableName));
            Scan scan = new Scan();
            scan.withStartRow(Bytes.toBytes(startRow));
            scan.withStopRow(Bytes.toBytes(stopRow));
            ResultScanner scanner = table.getScanner(scan);
            for(Result result : scanner){
                for(Cell cell : result.rawCells()){
                    String row = new String(CellUtil.cloneRow(cell));
                    String family = new String(CellUtil.cloneFamily(cell));
                    String column = new String(CellUtil.cloneQualifier(cell));
                    String val = new String(CellUtil.cloneValue(cell));
                    log.info("row {}, family {}, column{}, val {}",row, family, column, val);
                }
            }
            table.close();
        }else{

        }
    }




}
